Skip to content

Add Oracle UPSERT#645

Open
psainics wants to merge 8 commits intodata-integrations:developfrom
cloudsufi:orc-upsert
Open

Add Oracle UPSERT#645
psainics wants to merge 8 commits intodata-integrations:developfrom
cloudsufi:orc-upsert

Conversation

@psainics
Copy link
Contributor

This pull request introduces support for upsert operations in the Oracle batch sink plugin.

  • Added a new class OracleETLDBOutputFormat that extends ETLDBOutputFormat and implements the constructUpsertQuery method, generating an Oracle-compatible MERGE-based upsert SQL statement.
  • Updated OracleSink to use the new OracleETLDBOutputFormat by adding an output context with SinkOutputFormatProvider.

UI Update

image

@psainics psainics requested a review from prince-cs January 30, 2026 03:09
@psainics psainics self-assigned this Jan 30, 2026
@psainics psainics added the build label Jan 30, 2026
Copy link
Contributor

@prince-cs prince-cs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@psainics psainics requested a review from prince-cs February 19, 2026 04:35
@psainics psainics marked this pull request as ready for review February 19, 2026 04:35
@Override
public String constructUpsertQuery(String table, String[] fieldNames, String[] listKeys) {
if (listKeys == null) {
throw new IllegalArgumentException("Column names to be updated should not be null");
Copy link
Contributor

@itsankit-google itsankit-google Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the name for this config in UI?

Column names does not help understand which config are we exactly talking about.

Also, can we move these to sink config validations when UPSERT is selected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we also have this validation in Abstract DB sink

collector.addFailure(
"Table key must be set if the operation is 'Update' or 'Upsert'.", null)
.withConfigProperty(RELATION_TABLE_KEY).withConfigProperty(OPERATION_NAME);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming convention is taken from ETLDBOutputFormat and PostgresETLDBOutputFormat

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have these validations, why will there be a point when code reaches here without these properties present?

if (listKeys == null) {
throw new IllegalArgumentException("Column names to be updated should not be null");
} else if (fieldNames == null) {
throw new IllegalArgumentException("Field names should not be null");
Copy link
Contributor

@itsankit-google itsankit-google Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment here.

Better to move these to sink config validations to fail early.

Copy link
Contributor Author

@psainics psainics Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is input schema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so these validations should move to source.

@Override
protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
String fieldName, int fieldIndex) throws SQLException {
int sqlType = columnTypes.get(fieldIndex).getType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case, is it possible that fieldIndex is greater than columnTypes.size()-1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in common class adds col (that are to be updated) and then col (which we are going to match), this can lead to query having duplicate reference and the field index be more than the total col size.

int sqlType = Types.OTHER;
// avoid OOB exception in case of mismatch between columnTypes and record schema fields
for (ColumnType columnType : columnTypes) {
if (columnType.getName().equals(fieldName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be using equalsIgnoreCase like

Schema.Field field = record.getSchema().getField(columnType.getName(), true);
?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, oracle has case sensitive col names

String fieldName, int fieldIndex) throws SQLException {
int sqlType = columnTypes.get(fieldIndex).getType();
int sqlIndex = fieldIndex + 1;
int sqlType = Types.OTHER;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be throwing an error if none of the fieldName matches in the columnType.getName() list?

Why are we assuming Types.OTHER when there is no match?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will fail for Types.OTHER, but throwing an error sound like better option.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants